AbstractQueuedSynchronizer
- AbstractQueuedSynchronizer 类继承自 AbstractOwnableSynchronizer,AbstractOwnableSynchronizer 主要维护了 exclusiveOwnerThread (独占模式的线程)
- AbstractQueuedSynchronizer 为实现依赖 FIFO 等待队列的阻塞锁和相关的同步器提供一个框架.该类被设计为依赖单个原子int值表示状态的大多数同步器的一个有用的基础.子类必须定义改变该状态的受保护方法,以及这些方法定义了该状态在获取或释放该对象方面的含义.
- 使用这个类作为同步器的基础,实现如下方法,使用 getState、setState 或compareAndSetState 方式适当的获取或修改同步状态.
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
AbstractQueuedSynchronizer.Node 等待队列节点类
- 等待队列是 CLH 锁队列的一个变种
- Node 类的属性如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41class Node {
// 表示节点在共享模式下等待
static final Node SHARED = new Node();
// 表示节点在独占模式下等待
static final Node EXCLUSIVE = null;
// 表示线程已取消的 waitStatus 值
static final int CANCELLED = 1;
// 表示后继线程需要 unparking 的 waitStatus 值
static final int SIGNAL = -1;
// 表示线程在条件等待的 waitStatus 值
static final int CONDITION = -2;
// 指示下一个 acquireShared的waitStatus 值应该无条件传播
static final int PROPAGATE = -3;
/**
* 状态指定,取如下值:
* SIGNAL: 当前节点的后继节点被阻塞(通过park),所以当
* 前节点释放或取消的时候需要unpark后继节点
* CANCELLED: 当前节点由于超时或中断被取消
* CONDITION: 当前节点在条件队列中
* PROPAGATE: releaseShared 操作需要传播给其他节点
* 0: 以上都不是
* 值以数字的方式排列是为了简化使用.
* 对于普通的同步节点,该字段的初始值为0,condition 节点初始值
* 为 CONDITION
*/
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
/**
* 指向下一个条件等待的节点或特定值 SHARED. 因为条件队列仅出在
* 独占模式,当节点处于条件等待时,我们仅需要一个简单的链表保存
* 节点. 随后它们会被转移到 re-acquire 队列中. 因为条件等待必
* 须是独占的,所以使用这个字段表示共享模式(见 isShared() 的判
* 断逻辑).
*/
Node nextWaiter;
}
AbstractQueuedSynchronizer的主要属性如下:
1
2
3
4
5
6
7
8
9
10
11/**
* 等待队列的头结点,延迟初始化. 除了初始化,其更新操作都通过setHead方法.
* 如果头结点存在,其状态保证不是 CANCELLED
*/
private transient volatile Node head;
/**
* 等待队列的尾节点. 只有在添加新节点时通过 enq 进行更新
*/
private transient volatile Node tail;
// 同步状态
private volatile int state;CLH队列的入队操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 尾节点为空,则表示队列为空,先初始化头结点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}为当前线程创建节点并插入等待队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试最快的方式入队,失败以后在执行完整的入队操作
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}private void unparkSuccessor(Node node) 唤醒后继节点,找到当前节点后继节点中第一个状态不为取消状态的节点进行唤醒
- private void doReleaseShared() 共享模式的释放操作
- private void cancelAcquire(Node node) 取消正在进行的获取尝试
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) 检查和更新未能获取的节点的状态.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 当前节点已经设置状态,请求当前驱节点释放的时候通知它,因此可以安全的
* park当前节点
*/
return true;
if (ws > 0) {
/*
* 前驱节点被取消了,跳过所有被取消的前驱节点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus 必须为0或 PROPAGATE,表示我们需要通知,但是不进行
* park,调用者需要进行重试,确保在park之前不会进行获取
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}public final void acquire(int arg) 独占式获取操作,忽略中断,首先尝试获取,获取失败后将当前节点插入到等待队列,然后在循环中自旋获取,在获取过程中如果前驱节点的状态为 SIGNAL,则park当前线程,等待被唤醒,唤醒以后重新自旋获取
1
2
3
4
5public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}public final void acquireInterruptibly(int arg) 独占式获取,响应中断,如果在获取的过程中线程中断直接抛出InterruptedException
- public final boolean tryAcquireNanos(int arg, long nanosTimeout) 超时获取,响应中断,超时失败返回
- public final boolean release(int arg) 独占模式的释放操作
- public final void acquireShared(int arg) 共享模式获取,不响应中断
- public final void acquireSharedInterruptibly(int arg) 共享模式获取,响应中断
- public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 共享模式获取,响应中断,响应超时
- public final boolean releaseShared(int arg) 共享模式释放操作
ConditionObject 实现的实现 ReentrantLock 的 condition 就是基于此实现的,condition 的 wait、signal 操作必须在持有锁的情况下进行
condition 队列,condition 队列由以下两个应用标识头尾,并由 Node 的 nextWaiter 进行连接,condition 队列中的状态都是 Node.CONDITION
1
2
3
4// condition 队列的第一个节点
private transient Node firstWaiter;
// condition 队列的最后一个节点
private transient Node lastWaiter;public final void await() wait 操作,主要由以下步骤组成
- 将当前线程添加到 condition 对于列
- 释放当前线程持有的锁
- 循环判断当前节点是否在同步等待队列中,如果不在同步等待队列中则执行 LockSupport.park 操作挂起当前线程
- 当前线程被唤醒且关联的节点在同步等待队列中则执行后续操作
- 重新去获取锁,并继续往后执行
- public final void signal() signal 操作
- 执行 signal 的线程必须独占的持有锁
- 将 condition 队列中的第一个节点移动到同步等待队列的尾部